Library Imports
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
from datetime import datetime
from decimal import Decimal
Template
spark = (
SparkSession.builder
.master("local")
.appName("Section 2.11 - Unionizing Multiple Dataframes")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
sc = spark.sparkContext
import os
data_path = "/data/pets.csv"
base_path = os.path.dirname(os.getcwd())
path = base_path + data_path
pets = spark.read.csv(path, header=True)
pets.toPandas()
| id | breed_id | nickname | birthday | age | color | |
|---|---|---|---|---|---|---|
| 0 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown |
| 1 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None |
| 2 | 3 | 1 | Chewie | 2016-11-22 10:05:10 | 15 | None |
| 3 | 3 | 2 | Maple | 2018-11-22 10:05:10 | 17 | white |
Unionizing Multiple Dataframes
There are a couple of situations where you would want to perform an union transformation.
Case 1: Collecting Data from Various Sources
When you're collecting data from multiple sources, some point in your spark application you will need to reconcile all the different sources into the same format and work with a single source of truth. This will require you to union the different datasets together.
Case 2: Perfoming Different Transformations on your Dataset
Sometimes you would like to perform seperate transformations on different parts of your data based on your task. This would involve breaking up your dataset into different parts and working on them individually. Then at some point you might want to stitch they back together, this would again be a union operation.
Case 1 - union() (the Wrong Way)
pets_2 = pets.select(
'breed_id',
'id',
'age',
'color',
'birthday',
'nickname'
)
(
pets
.union(pets_2)
.where(F.col('id').isin(1,2))
.toPandas()
)
| id | breed_id | nickname | birthday | age | color | |
|---|---|---|---|---|---|---|
| 0 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown |
| 1 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None |
| 2 | 1 | 1 | 5 | brown | 2014-11-22 12:30:31 | King |
| 3 | 1 | 3 | 15 | None | 2016-11-22 10:05:10 | Chewie |
| 4 | 2 | 3 | 17 | white | 2018-11-22 10:05:10 | Maple |
Case 1 - Another Wrong Way
pets_3 = pets.select(
'*',
'*'
)
pets_3.show()
(
pets
.union(pets_3)
.where(F.col('id').isin(1,2))
.toPandas()
)
+---+--------+--------+-------------------+---+-----+---+--------+--------+-------------------+---+-----+
| id|breed_id|nickname| birthday|age|color| id|breed_id|nickname| birthday|age|color|
+---+--------+--------+-------------------+---+-----+---+--------+--------+-------------------+---+-----+
| 1| 1| King|2014-11-22 12:30:31| 5|brown| 1| 1| King|2014-11-22 12:30:31| 5|brown|
| 2| 3| Argus|2016-11-22 10:05:10| 10| null| 2| 3| Argus|2016-11-22 10:05:10| 10| null|
| 3| 1| Chewie|2016-11-22 10:05:10| 15| null| 3| 1| Chewie|2016-11-22 10:05:10| 15| null|
| 3| 2| Maple|2018-11-22 10:05:10| 17|white| 3| 2| Maple|2018-11-22 10:05:10| 17|white|
+---+--------+--------+-------------------+---+-----+---+--------+--------+-------------------+---+-----+
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
<ipython-input-5-c8157f574918> in <module>()
8 (
9 pets
---> 10 .union(pets_3)
11 .where(F.col('id').isin(1,2))
12 .toPandas()
/usr/local/lib/python2.7/site-packages/pyspark/sql/dataframe.pyc in union(self, other)
1336 Also as standard in SQL, this function resolves columns by position (not by name).
1337 """
-> 1338 return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)
1339
1340 @since(1.3)
/usr/local/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/local/lib/python2.7/site-packages/pyspark/sql/utils.pyc in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: u"Union can only be performed on tables with the same number of columns, but the first table has 6 columns and the second table has 12 columns;;\n'Union\n:- Relation[id#10,breed_id#11,nickname#12,birthday#13,age#14,color#15] csv\n+- Project [id#10, breed_id#11, nickname#12, birthday#13, age#14, color#15, id#10, breed_id#11, nickname#12, birthday#13, age#14, color#15]\n +- Relation[id#10,breed_id#11,nickname#12,birthday#13,age#14,color#15] csv\n"
What Happened?
This actually worked out quite nicely, I forgot this was the case actually. Spark will only allow you to union df that have the exact number of columns and where the column datatypes are exactly the same.
Case 1
Because we infered the schema and datatypes from the csv file it was able to union the 2 dataframes, but the results doesn't make sense at all; The columns don't match up.
Case 2
We created a new dataframe with twice the numnber of columns and tried to union it with the original df, spark threw an error as it doesn't know what to do when the number of columns don't match up.
Case 2 - union() (the Right Way)
(
pets
.union(pets_2.select(pets.columns))
.union(pets_3.select(pets.columns))
.toPandas()
)
| id | breed_id | nickname | birthday | age | color | |
|---|---|---|---|---|---|---|
| 0 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown |
| 1 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None |
| 2 | 3 | 1 | Chewie | 2016-11-22 10:05:10 | 15 | None |
| 3 | 3 | 2 | Maple | 2018-11-22 10:05:10 | 17 | white |
| 4 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown |
| 5 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None |
| 6 | 3 | 1 | Chewie | 2016-11-22 10:05:10 | 15 | None |
| 7 | 3 | 2 | Maple | 2018-11-22 10:05:10 | 17 | white |
| 8 | 1 | 1 | King | 2014-11-22 12:30:31 | 5 | brown |
| 9 | 2 | 3 | Argus | 2016-11-22 10:05:10 | 10 | None |
| 10 | 3 | 1 | Chewie | 2016-11-22 10:05:10 | 15 | None |
| 11 | 3 | 2 | Maple | 2018-11-22 10:05:10 | 17 | white |
What Happened?
The columns match perfectly! How? For each of the new df that you would like to union with the original df you will select the column from the original df during the union. This will:
- Guarantees the ordering of the columns, as a
selectwill select the columns in order of which they are listed in. - Guarantees that only the columns of the original
dfis selected, from the previous sections, we know thatselectwill only the specified columns.
Summary
- Always always be careful when you are
unioningdftogether. - When you
uniondfs together you should ensure:- The number of columns are the same.
- The columns are the exact same.
- The columns are in the same order.